package net;

import com.alibaba.middleware.race.sync.Client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
 * Author :  Rocky
 * Date : 13/06/2017 16:23
 * Description :
 * Test :
 */
public class NetworkClient {

    private static final Logger log = LoggerFactory.getLogger(Client.class);

    private String ip;

    private int port;

    private ChannelHandlerContext chc;

    //接收数据的状态：0(正在接收元数据)，1(正在接收变更数据)，2(正在接收pkStart,pkEnd)，3(正在接收结果数据)，4(已经接收完结果数据)
    private int receivingDataStatus;

    //处理数据
    private DataHander dataHander;

    public NetworkClient(String ip, int port, DataHander dataHander) {
        this.ip = ip;
        this.port = port;
        this.dataHander = dataHander;
        log.info("server端ip:" + ip + "，端口:" + port);
        connect();
    }

    private void connect() {
        Executor executor = Executors.newSingleThreadExecutor();
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, executor);
        Bootstrap b = new Bootstrap();
        b.group(bossGroup);
        b.channel(NioSocketChannel.class);
        b.option(ChannelOption.TCP_NODELAY, true);
        b.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(30 * 1024 * 1024, 0, 4, -4, 4));
                ch.pipeline().addLast(new ByteDecoder());
            }
        });

        doConnect(b);
        bossGroup.shutdownGracefully();
    }

    private void doConnect(Bootstrap b) {
        try {
            ChannelFuture f = b.connect(ip, port).sync();
            log.info("client端连接server成功");
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e1) {

            }
            log.info("连接失败，重试");
            doConnect(b);
        }
    }


    private class ByteDecoder extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            chc = ctx;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

            ByteBuf buf = (ByteBuf) msg;

            synchronized (NetworkClient.class) {
                //如果是结束符
                if (buf.readableBytes() == 1 && buf.getByte(0) == -1) {

                    //变更接收数据的状态
                    receivingDataStatus++;

                    //说明已经接收完元数据
                    if (receivingDataStatus == 1) {
                        log.info("元数据接收完成");
                    }
                    //说明已经接收完 pkStart,pkEnd
                    else if (receivingDataStatus == 2) {
                        log.info("pkStart,pkEnd接收完成");
                    }
                    //说明已经接收完变更数据
                    else if (receivingDataStatus == 3) {
                        log.info("变更数据接收完成");
                    }
                    //说明已经接收完变更数据
                    else if (receivingDataStatus == 4) {
                        log.info("结果数据接收完成");
                        chc.close();
                        dataHander.getServerResultComplete();
                    }
                    return;
                }

                //如果接收的是元数据
                if (receivingDataStatus == 0) {
                    dataHander.handleRecordMeta(buf);
                    return;
                }

                //如果接收的是pkStart,pkEnd
                if (receivingDataStatus == 1) {
                    dataHander.handlePkStartAndEnd(buf);
                    return;
                }

                //如果接收的是变更数据
                if (receivingDataStatus == 2) {
                    log.info("something wrong,不应该执行");
                    //执行insert,update,delete操作
                    dataHander.handleChangeRecord(buf);
                    return;
                }

                //如果接收的是结果数据
                if (receivingDataStatus == 3) {
                    //处理服务端发送来的结果数据
                    dataHander.handleServerResultData(buf);
                    return;
                }

            }
            buf.release();
        }


        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error(cause.getMessage(), cause);
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            log.info("与服务端链接关闭");
        }
    }

}
